style: fix ruff format violations in PR #6660 context compression files#6737
style: fix ruff format violations in PR #6660 context compression files#6737rin259 wants to merge 2 commits intoAstrBotDevs:masterfrom
Conversation
- Token 估算算法改进(中英数特字符分别计算) - 添加 Token 计数缓存和摘要缓存 - ContextManager 添加指纹机制减少重复计算 - 修复缓存键碰撞和 overhead 重复计算 bug
|
Warning Gemini encountered an error creating the summary. You can try again by commenting |
There was a problem hiding this comment.
Hey - I've found 4 issues, and left some high level feedback:
- The PR title/description indicate a formatting-only change, but the diff introduces substantial new logic (caching, stats, compressors, tests); consider updating the PR metadata and possibly splitting functional changes from formatting to keep the history clear.
- ContextManager and other code paths are directly calling private methods/attributes (e.g. token_counter._get_cache_key, _generate_summary_cache_key, private cache internals) which couples components tightly; consider promoting these to public APIs or introducing explicit interfaces instead of reaching into private members.
- In ContextManager.process, the fingerprint used for cache reuse is computed from the original messages argument while token counting and compression decisions operate on result (post-truncation/history changes), which can lead to stale or incorrect cache hits; consider basing the fingerprint on the actual list being token-counted.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The PR title/description indicate a formatting-only change, but the diff introduces substantial new logic (caching, stats, compressors, tests); consider updating the PR metadata and possibly splitting functional changes from formatting to keep the history clear.
- ContextManager and other code paths are directly calling private methods/attributes (e.g. token_counter._get_cache_key, _generate_summary_cache_key, private cache internals) which couples components tightly; consider promoting these to public APIs or introducing explicit interfaces instead of reaching into private members.
- In ContextManager.process, the fingerprint used for cache reuse is computed from the original messages argument while token counting and compression decisions operate on result (post-truncation/history changes), which can lead to stale or incorrect cache hits; consider basing the fingerprint on the actual list being token-counted.
## Individual Comments
### Comment 1
<location path="astrbot/core/agent/context/manager.py" line_range="60-61" />
<code_context>
+ if not messages:
+ return 0
+
+ # 使用 token counter 的缓存键作为指纹
+ return self.token_counter._get_cache_key(messages)
+
async def process(
</code_context>
<issue_to_address>
**suggestion:** Using a private method for the fingerprint couples ContextManager tightly to token_counter internals.
Since this calls `self.token_counter._get_cache_key(messages)`, it depends on a private API that may change when the token counter is refactored (e.g., cache key structure or implementation). Please either add and use a public `get_cache_key`/`fingerprint` on the token counter, or compute the fingerprint here using only the public `Message` interface to keep the abstraction boundary clean.
Suggested implementation:
```python
# 使用 token counter 的公开缓存键方法作为指纹
return self.token_counter.get_cache_key(messages)
```
You will also need to:
1. Add a public method (e.g. `get_cache_key`) on the token counter class, most likely in `astrbot/core/agent/context/token_counter.py`, with a signature like:
```python
def get_cache_key(self, messages: list[Message]) -> int:
return self._get_cache_key(messages)
```
2. Ensure that this new public method is part of the token counter's stable API and replace any other direct usages of `_get_cache_key` (if present) with `get_cache_key` for consistency and to keep the abstraction boundary clean.
</issue_to_address>
### Comment 2
<location path="astrbot/core/agent/context/compressor.py" line_range="149-158" />
<code_context>
return system_messages, messages_to_summarize, recent_messages
+def _generate_summary_cache_key(messages: list[Message]) -> str:
+ """Generate a cache key for summary based on full history.
+
+ Uses role and content from all messages to create a collision-resistant key.
+ """
+ if not messages:
+ return ""
+
+ key_parts = []
+ for msg in messages:
+ content = msg.content if isinstance(msg.content, str) else str(msg.content)
+ key_parts.append(f"{msg.role}:{content[:50]}")
+
+ return "|".join(key_parts)
</code_context>
<issue_to_address>
**issue (bug_risk):** Summary cache key truncation to 50 characters can cause collisions and incorrect cache hits.
Because the key uses `content[:50]` per message, different histories that share the same first 50 characters will collide and reuse the same summary, returning stale or incorrect results. Consider using a hash of the full content (e.g. `hashlib.sha1(content.encode()).hexdigest()`), optionally combined with message count and roles, or at least a longer prefix plus the full length to reduce collision risk.
</issue_to_address>
### Comment 3
<location path="tests/test_context_compression.py" line_range="355-356" />
<code_context>
+ assert self.manager._last_messages_fingerprint is None
+
+
+if __name__ == "__main__":
+ pytest.main([__file__, "-v"])
</code_context>
<issue_to_address>
**nitpick (testing):** Remove the __main__ block; pytest will discover these tests automatically
This `if __name__ == "__main__": pytest.main(...)` block isn’t needed in a pytest test module and may interfere with tooling. Since pytest will automatically discover and run these tests, please remove it to keep the file simpler.
</issue_to_address>
### Comment 4
<location path="astrbot/core/agent/context/manager.py" line_range="50" />
<code_context>
truncate_turns=config.truncate_turns
)
+ # 缓存上一次计算的消息指纹和 token 数
+ self._last_messages_fingerprint: int | None = None
+ self._last_token_count: int | None = None
</code_context>
<issue_to_address>
**issue (complexity):** Consider removing the new caching/fingerprinting logic from ContextManager and delegating all token-count caching to EstimateTokenCounter while preserving compression behavior and stats.
You can simplify the new logic by dropping the duplicated caching/fingerprinting in `ContextManager` and relying solely on `EstimateTokenCounter`’s internal cache, while keeping compression behavior and stats.
### 1. Remove redundant cache state and private‑method coupling
Remove the extra fields and `_get_messages_fingerprint`, and their usages:
```python
class ContextManager:
def __init__(self, config: ContextConfig) -> None:
...
# Remove these:
# self._last_messages_fingerprint: int | None = None
# self._last_token_count: int | None = None
self._compression_count = 0
# Remove this method entirely
# def _get_messages_fingerprint(self, messages: list[Message]) -> int:
# ...
```
### 2. Simplify `process` to always use `token_counter.count_tokens`
This keeps all behavior (including `trusted_token_usage`) but delegates caching to `EstimateTokenCounter`:
```python
async def process(
self, messages: list[Message], trusted_token_usage: int = 0
) -> list[Message]:
try:
result = messages
if self.config.enforce_max_turns != -1:
result = self.truncator.truncate_by_turns(
result,
keep_most_recent_turns=self.config.enforce_max_turns,
drop_turns=self.config.truncate_turns,
)
if self.config.max_context_tokens > 0:
if trusted_token_usage > 0:
total_tokens = trusted_token_usage
else:
total_tokens = self.token_counter.count_tokens(result)
if self.compressor.should_compress(
result, total_tokens, self.config.max_context_tokens
):
result = await self._run_compression(result, total_tokens)
return result
except Exception as e:
logger.error(f"Error during context processing: {e}", exc_info=True)
return messages
```
### 3. Simplify `_run_compression` and avoid manual cache updates
Just count tokens via the counter; its cache will make repeated calls cheap:
```python
async def _run_compression(
self, messages: list[Message], prev_tokens: int
) -> list[Message]:
logger.debug("Compress triggered, starting compression...")
self._compression_count += 1
messages = await self.compressor(messages)
tokens_after_compression = self.token_counter.count_tokens(messages)
compress_rate = (
tokens_after_compression / self.config.max_context_tokens
) * 100
logger.info(
f"Compress #{self._compression_count} completed."
f" {prev_tokens} -> {tokens_after_compression} tokens,"
f" compression rate: {compress_rate:.2f}%.",
)
if self.compressor.should_compress(
messages, tokens_after_compression, self.config.max_context_tokens
):
logger.info(
"Context still exceeds max tokens after compression, "
"applying halving truncation..."
)
messages = self.truncator.truncate_by_halving(messages)
return messages
```
### 4. Keep stats, but avoid parallel cache tracking
You can still expose `compression_count` and use the token counter’s own stats, without tracking a parallel cache in `ContextManager`:
```python
def get_stats(self) -> dict:
stats = {
"compression_count": self._compression_count,
}
if hasattr(self.token_counter, "get_cache_stats"):
stats["token_counter_cache"] = self.token_counter.get_cache_stats()
return stats
def reset_stats(self) -> None:
self._compression_count = 0
if hasattr(self.token_counter, "clear_cache"):
self.token_counter.clear_cache()
```
This retains all functional behavior (compression flow, logging, `trusted_token_usage`, observability) but removes duplicated caching logic, state sync branches, and reliance on a private `_get_cache_key` method.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| # 使用 token counter 的缓存键作为指纹 | ||
| return self.token_counter._get_cache_key(messages) |
There was a problem hiding this comment.
suggestion: Using a private method for the fingerprint couples ContextManager tightly to token_counter internals.
Since this calls self.token_counter._get_cache_key(messages), it depends on a private API that may change when the token counter is refactored (e.g., cache key structure or implementation). Please either add and use a public get_cache_key/fingerprint on the token counter, or compute the fingerprint here using only the public Message interface to keep the abstraction boundary clean.
Suggested implementation:
# 使用 token counter 的公开缓存键方法作为指纹
return self.token_counter.get_cache_key(messages)You will also need to:
- Add a public method (e.g.
get_cache_key) on the token counter class, most likely inastrbot/core/agent/context/token_counter.py, with a signature like:def get_cache_key(self, messages: list[Message]) -> int: return self._get_cache_key(messages)
- Ensure that this new public method is part of the token counter's stable API and replace any other direct usages of
_get_cache_key(if present) withget_cache_keyfor consistency and to keep the abstraction boundary clean.
| def _generate_summary_cache_key(messages: list[Message]) -> str: | ||
| """Generate a cache key for summary based on full history. | ||
|
|
||
| Uses role and content from all messages to create a collision-resistant key. | ||
| """ | ||
| if not messages: | ||
| return "" | ||
|
|
||
| key_parts = [] | ||
| for msg in messages: |
There was a problem hiding this comment.
issue (bug_risk): Summary cache key truncation to 50 characters can cause collisions and incorrect cache hits.
Because the key uses content[:50] per message, different histories that share the same first 50 characters will collide and reuse the same summary, returning stale or incorrect results. Consider using a hash of the full content (e.g. hashlib.sha1(content.encode()).hexdigest()), optionally combined with message count and roles, or at least a longer prefix plus the full length to reduce collision risk.
| if __name__ == "__main__": | ||
| pytest.main([__file__, "-v"]) |
There was a problem hiding this comment.
nitpick (testing): Remove the main block; pytest will discover these tests automatically
This if __name__ == "__main__": pytest.main(...) block isn’t needed in a pytest test module and may interfere with tooling. Since pytest will automatically discover and run these tests, please remove it to keep the file simpler.
| truncate_turns=config.truncate_turns | ||
| ) | ||
|
|
||
| # 缓存上一次计算的消息指纹和 token 数 |
There was a problem hiding this comment.
issue (complexity): Consider removing the new caching/fingerprinting logic from ContextManager and delegating all token-count caching to EstimateTokenCounter while preserving compression behavior and stats.
You can simplify the new logic by dropping the duplicated caching/fingerprinting in ContextManager and relying solely on EstimateTokenCounter’s internal cache, while keeping compression behavior and stats.
1. Remove redundant cache state and private‑method coupling
Remove the extra fields and _get_messages_fingerprint, and their usages:
class ContextManager:
def __init__(self, config: ContextConfig) -> None:
...
# Remove these:
# self._last_messages_fingerprint: int | None = None
# self._last_token_count: int | None = None
self._compression_count = 0
# Remove this method entirely
# def _get_messages_fingerprint(self, messages: list[Message]) -> int:
# ...2. Simplify process to always use token_counter.count_tokens
This keeps all behavior (including trusted_token_usage) but delegates caching to EstimateTokenCounter:
async def process(
self, messages: list[Message], trusted_token_usage: int = 0
) -> list[Message]:
try:
result = messages
if self.config.enforce_max_turns != -1:
result = self.truncator.truncate_by_turns(
result,
keep_most_recent_turns=self.config.enforce_max_turns,
drop_turns=self.config.truncate_turns,
)
if self.config.max_context_tokens > 0:
if trusted_token_usage > 0:
total_tokens = trusted_token_usage
else:
total_tokens = self.token_counter.count_tokens(result)
if self.compressor.should_compress(
result, total_tokens, self.config.max_context_tokens
):
result = await self._run_compression(result, total_tokens)
return result
except Exception as e:
logger.error(f"Error during context processing: {e}", exc_info=True)
return messages3. Simplify _run_compression and avoid manual cache updates
Just count tokens via the counter; its cache will make repeated calls cheap:
async def _run_compression(
self, messages: list[Message], prev_tokens: int
) -> list[Message]:
logger.debug("Compress triggered, starting compression...")
self._compression_count += 1
messages = await self.compressor(messages)
tokens_after_compression = self.token_counter.count_tokens(messages)
compress_rate = (
tokens_after_compression / self.config.max_context_tokens
) * 100
logger.info(
f"Compress #{self._compression_count} completed."
f" {prev_tokens} -> {tokens_after_compression} tokens,"
f" compression rate: {compress_rate:.2f}%.",
)
if self.compressor.should_compress(
messages, tokens_after_compression, self.config.max_context_tokens
):
logger.info(
"Context still exceeds max tokens after compression, "
"applying halving truncation..."
)
messages = self.truncator.truncate_by_halving(messages)
return messages4. Keep stats, but avoid parallel cache tracking
You can still expose compression_count and use the token counter’s own stats, without tracking a parallel cache in ContextManager:
def get_stats(self) -> dict:
stats = {
"compression_count": self._compression_count,
}
if hasattr(self.token_counter, "get_cache_stats"):
stats["token_counter_cache"] = self.token_counter.get_cache_stats()
return stats
def reset_stats(self) -> None:
self._compression_count = 0
if hasattr(self.token_counter, "clear_cache"):
self.token_counter.clear_cache()This retains all functional behavior (compression flow, logging, trusted_token_usage, observability) but removes duplicated caching logic, state sync branches, and reliance on a private _get_cache_key method.
Summary
Fixes the format-check CI failure blocking PR #6660.
Changes
Ruff format fixes for 3 files in the context compression PR:
astrbot/core/agent/context/compressor.pyastrbot/core/agent/context/manager.pyastrbot/core/agent/context/token_counter.pyFixes:
Files changed
All changes are purely cosmetic/formatting.
Note: This PR fixes the format-check CI failure on PR #6660 (
perf/context-compression-v2). Merge after #6660 or merge both together.Summary by Sourcery
Optimize context compression pipeline with token counting, caching, and summary reuse improvements, and add coverage tests for the new behavior.
New Features:
Enhancements:
Tests: